#include <stdio.h>
#include <stdlib.h>
#include <assert.h>


#include <Winsock2.h>
#include <windows.h>
#include <process.h>

#include "zmisc_cache.h"
#include "socket_msg.h"

#include "worker_thread.h"

enum struct service_event
{
    evt_recved = 0x10,
    evt_sended,
};

struct iocp_worker
{
    long volatile               lv_exit_flag;
    unsigned long               ul_workers;
    HANDLE                      *h_threads;
    HANDLE                      h_worker_iocp;
    void                        *p_over_lapped_cache;

    int (*pfn_on_recv_msg)( void *p_worker, char *pc_msg_data );
};

struct worker_over_lapped
{
    OVERLAPPED          st_ovlp;
    enum service_event  em_evt;
    unsigned int        ui_data_len;
    unsigned int        ui_trans_len;
    char                c_msg[MAX_SOCKET_MSG_LEN];
};



int iocp_recv_msg( void *p_worker, SOCKET s_socket, struct worker_over_lapped *pst_worker_ovlp)
{
    int                       i_result         = -1;
    struct iocp_worker        *pst_worker      = (struct iocp_worker*)p_worker;
    //struct worker_over_lapped *pst_worker_ovlp = (struct worker_over_lapped*)0;
    WSABUF                    st_wsa_buffer    = {0};
    DWORD                     dw_len           = 0;
    DWORD                     dw_flag          = 0;

    for ( ; ; )
    {
        if ( INVALID_SOCKET == s_socket )
        {
            break;
        }

        if ( (struct worker_over_lapped*)0 == pst_worker_ovlp )
        {
            pst_worker_ovlp = (struct worker_over_lapped *)zmisc_alloc_cache_data( pst_worker->p_over_lapped_cache );
            if ( (struct worker_over_lapped*)0 == pst_worker_ovlp )
            {
                break;
            }
            memset( pst_worker_ovlp, 0, sizeof(struct worker_over_lapped) );
            pst_worker_ovlp->ui_data_len = (unsigned int)sizeof(unsigned int);
            st_wsa_buffer.len            = (unsigned int)sizeof(unsigned int);
        }
        else
        {
            memset(&pst_worker_ovlp->st_ovlp, 0, sizeof(pst_worker_ovlp->st_ovlp));
            st_wsa_buffer.len            = pst_worker_ovlp->ui_data_len - pst_worker_ovlp->ui_trans_len;
        }

        pst_worker_ovlp->em_evt      = service_event::evt_recved;
        st_wsa_buffer.buf            = pst_worker_ovlp->c_msg + pst_worker_ovlp->ui_trans_len;
        

        if ( SOCKET_ERROR != WSARecv( s_socket, &st_wsa_buffer, 1, &dw_len, &dw_flag, (OVERLAPPED *)pst_worker_ovlp, (LPWSAOVERLAPPED_COMPLETION_ROUTINE)0 ) )
        {
            i_result = 0;
            break;
        }
        if ( ERROR_IO_PENDING != WSAGetLastError() )
        {
            break;
        }

        i_result = 0;
        break;
    }

    if ( 0 != i_result )
    {
        if ( (struct worker_over_lapped*)0 != pst_worker_ovlp )
        {
            zmisc_free_cache_data( pst_worker->p_over_lapped_cache, pst_worker_ovlp);
            closesocket( s_socket );
        }
    }

    return i_result;
}


int iocp_send_msg( void *p_worker, SOCKET s_socket, struct worker_over_lapped  *pst_worker_ovlp )
{
    int                        i_result         = -1;
    struct iocp_worker         *pst_worker      = (struct iocp_worker*)p_worker;
    WSABUF                     st_wsa_buffer    = {0};
    DWORD                      dw_len           = 0;


    for ( ; ; )
    {
        if ( INVALID_SOCKET == s_socket )
        {
            break;
        }

        if ( (struct worker_over_lapped*)0 == pst_worker_ovlp )
        {
            break;
        }

        memset(&pst_worker_ovlp->st_ovlp, 0, sizeof(pst_worker_ovlp->st_ovlp));
        pst_worker_ovlp->em_evt      = service_event::evt_sended;
        st_wsa_buffer.buf            = pst_worker_ovlp->c_msg + pst_worker_ovlp->ui_trans_len;
        st_wsa_buffer.len            = pst_worker_ovlp->ui_data_len - pst_worker_ovlp->ui_trans_len;

        if ( SOCKET_ERROR != WSASend( s_socket, &st_wsa_buffer, 1, &dw_len, 0, (OVERLAPPED *)pst_worker_ovlp, (LPWSAOVERLAPPED_COMPLETION_ROUTINE)0 ) )
        {
            i_result = 0;
            break;
        }
        if ( ERROR_IO_PENDING != WSAGetLastError() )
        {
            break;
        }

        i_result = 0;
        break;
    }

    if ( 0 != i_result )
    {
        if ( (struct worker_over_lapped*)0 != pst_worker_ovlp )
        {
            zmisc_free_cache_data( pst_worker->p_over_lapped_cache, pst_worker_ovlp);
            closesocket( s_socket );
        }
    }

    return i_result;
}


static unsigned __stdcall service_work_thread( void* p_void )
{
    struct iocp_worker*        pst_worker         = (struct iocp_worker*)p_void;
    DWORD                      dw_number_of_bytes = 0;
    ULONG_PTR                  ulp_completion_key = 0;
    struct worker_over_lapped* pst_worker_ovlp    = (struct worker_over_lapped*)0;

    for ( ; 0 == pst_worker->lv_exit_flag; )
    {
        if ( FALSE == GetQueuedCompletionStatus( pst_worker->h_worker_iocp, &dw_number_of_bytes, &ulp_completion_key, (OVERLAPPED **)&pst_worker_ovlp, INFINITE ) )
        {
            dw_number_of_bytes = 0;
            continue;
        }
        if ( (struct worker_over_lapped*)0 == pst_worker_ovlp )
        {
            continue;
        }

        if ( 0 == dw_number_of_bytes )
        {
            printf( "close socket(%d)\n", (int)ulp_completion_key );
            closesocket( (SOCKET)ulp_completion_key );
            zmisc_free_cache_data( pst_worker->p_over_lapped_cache, pst_worker_ovlp);
            continue;
        }

        pst_worker_ovlp->ui_trans_len += dw_number_of_bytes;

        switch( pst_worker_ovlp->em_evt )
        {
            case service_event::evt_recved:
            {
                if (pst_worker_ovlp->ui_trans_len == sizeof(unsigned int) )
                {
                    pst_worker_ovlp->ui_data_len = *((unsigned int *)pst_worker_ovlp->c_msg);
                    if ( pst_worker_ovlp->ui_data_len > MAX_SOCKET_MSG_LEN )
                    {
                        printf( "max msg size(%u, %u) recv\n", (unsigned int)sizeof(pst_worker_ovlp->c_msg), (unsigned int)pst_worker_ovlp->ui_data_len );
                        closesocket( (SOCKET)ulp_completion_key );
                        zmisc_free_cache_data( pst_worker->p_over_lapped_cache, pst_worker_ovlp);
                        continue;
                    }
                }

                if ( pst_worker_ovlp->ui_data_len > pst_worker_ovlp->ui_trans_len )
                {
                    iocp_recv_msg( pst_worker, (SOCKET)ulp_completion_key, pst_worker_ovlp );
                    continue;
                }

                if ( 0 != pst_worker->pfn_on_recv_msg( pst_worker, pst_worker_ovlp->c_msg ) )
                {
                    printf( "close socket(%d)\n", (int)ulp_completion_key );
                    closesocket( (SOCKET)ulp_completion_key );
                    zmisc_free_cache_data( pst_worker->p_over_lapped_cache, pst_worker_ovlp);
                }
                else
                {
                    pst_worker_ovlp->ui_trans_len = 0;
                    pst_worker_ovlp->ui_data_len  = ((struct socket_msg_response *)pst_worker_ovlp->c_msg)->ui_msg_len;
                    iocp_send_msg( pst_worker, (SOCKET)ulp_completion_key, pst_worker_ovlp );
                }

                //printf( "on message(%u)\n" );
                //pst_worker->on_recved( (void*)ulp_completion_key, dw_number_of_bytes);
                break;
            }
            case service_event::evt_sended:
            {
                if ( pst_worker_ovlp->ui_data_len > pst_worker_ovlp->ui_trans_len )
                {
                    pst_worker_ovlp->ui_trans_len = 0;
                    pst_worker_ovlp->ui_data_len  = sizeof( unsigned int);
                    iocp_recv_msg( pst_worker, (SOCKET)ulp_completion_key, pst_worker_ovlp );
                }
                else
                {
                    iocp_send_msg( pst_worker, (SOCKET)ulp_completion_key, pst_worker_ovlp );
                }
                break;
            }
            default:
            {
                assert(0);
                break;
            }
        }

        pst_worker_ovlp = (struct worker_over_lapped*)0;
    }

    _endthreadex( 0 );
    return 0;
}



int stop_work_thread( void* p_worker )
{
    int                  i_result         = -1;
    unsigned long        ul_worker_index  = 0;
    struct iocp_worker   *pst_worker      = (struct iocp_worker *)p_worker;

    for ( ; ; )
    {
        if ( (struct iocp_worker*)0 == pst_worker )
        {
            break;
        }

        if ( (HANDLE*)0 == pst_worker->h_threads )
        {
            free( pst_worker );
            pst_worker = (struct iocp_worker*)0;
            i_result = 0;
            break;
        }

        pst_worker->lv_exit_flag = -1;
        for ( ul_worker_index = 0; ul_worker_index < pst_worker->ul_workers; ++ul_worker_index )
        {
            if ( (HANDLE)0 == pst_worker->h_worker_iocp )
            {
                continue;
            }
            PostQueuedCompletionStatus( pst_worker->h_worker_iocp, 0, 0, (LPOVERLAPPED)0 );
        }

        for ( ul_worker_index = 0; ul_worker_index < pst_worker->ul_workers; ++ul_worker_index )
        {

            WaitForSingleObject( pst_worker->h_threads[ ul_worker_index], INFINITE);
            CloseHandle( pst_worker->h_threads[ ul_worker_index] );
        }

        if ( (HANDLE*)0 != pst_worker->h_worker_iocp )
        {
            CloseHandle( pst_worker->h_worker_iocp );
            pst_worker->h_worker_iocp = (HANDLE*)0;
        }

        free( pst_worker->h_threads );
        pst_worker->h_threads = (HANDLE*)0;

        free( pst_worker );
        pst_worker = (struct iocp_worker*)0;

        i_result = 0;

        break;
    }

    return i_result;
}


void * start_work_thread( int (*pfn_on_recv_msg)( void *p_worker, char *pc_msg_data ) )
{
    int                         i_successed      = -1;
    struct iocp_worker*         pst_worker       = (struct iocp_worker*)0;
    SYSTEM_INFO                 st_system_info   = { 0 };
    unsigned long               ul_thread_count  = 0;
    unsigned long               ul_thread_index  = 0;
    unsigned int                ui_thread_id     = 0;

    for ( ; ; )
    {
        pst_worker = (struct iocp_worker*)malloc( sizeof(struct iocp_worker) );
        if ( (struct iocp_worker*)0 == pst_worker )
        {
            break;
        }
        memset( pst_worker, 0, sizeof( struct iocp_worker ) );

        pst_worker->pfn_on_recv_msg = pfn_on_recv_msg;

        pst_worker->p_over_lapped_cache = zmisc_init_cache( 1024, sizeof(struct worker_over_lapped) );
        if ((void*)0 == pst_worker->p_over_lapped_cache)
        {
            break;
        }

        pst_worker->h_worker_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, (HANDLE)0, 0, 0);
        if ( (HANDLE)0 == pst_worker->h_worker_iocp )
        {
            break;
        }

        GetSystemInfo( &st_system_info );
        ul_thread_count = st_system_info.dwNumberOfProcessors + 2;

        pst_worker->h_threads = (HANDLE*)malloc( ul_thread_count * sizeof(HANDLE) );
        if ( (HANDLE*)0 == pst_worker->h_threads )
        {
            break;
        }
        memset( pst_worker->h_threads, 0, ul_thread_count * sizeof(HANDLE) );
        pst_worker->ul_workers = ul_thread_count;

        i_successed = 0;
        for ( ul_thread_index = 0; ul_thread_index < ul_thread_count; ++ul_thread_index )
        {
            pst_worker->h_threads[ul_thread_index] = (HANDLE)_beginthreadex( (void *)0, 0, service_work_thread, pst_worker, 0, &ui_thread_id );
            if ((HANDLE)0== pst_worker->h_threads[ul_thread_index] )
            {
                i_successed = -1;
                break;
            }
        }

        break;
    }

    if ( 0 != i_successed )
    {
        stop_work_thread( pst_worker );
        pst_worker = (struct iocp_worker*)0;
    }

    return pst_worker;
}



int iocp_attach( void* p_worker, SOCKET s_socket )
{
    struct iocp_worker *pst_worker = (struct iocp_worker *)p_worker;

    if ( (struct iocp_worker*)0 == pst_worker )
    {
        return -1;
    }
    if ( 0 == pst_worker->ul_workers )
    {
        return -2;
    }

    if ( (HANDLE)0 == CreateIoCompletionPort( (HANDLE)s_socket, pst_worker->h_worker_iocp, (ULONG_PTR)s_socket, 0) )
    {
        return -3;
    }

    iocp_recv_msg( pst_worker, s_socket, nullptr );

    return 0;
}